package biz.ws.ws.sender;

import biz.ws.ws.Message;
import biz.ws.ws.MsgHolder;
import biz.ws.ws.MsgSender;
import com.alibaba.fastjson.JSON;
import org.springframework.kafka.core.KafkaTemplate;

/**
 * @创建人 wsdc
 * @时间 2021/4/4
 * @描述 其他服务使用这个将消息发送到kafka中
 */
public class ServiceKafkaSender implements MsgSender {
    KafkaTemplate kafka;
    int partitions;
    String topic;

    public ServiceKafkaSender(KafkaTemplate kafka, int partitions, String topic) {
        this.kafka = kafka;
        this.partitions = partitions;
        this.topic = topic;
    }

    @Override
    public void send(MsgHolder context) {
        Message message = context.getMessage();
        Long toId = message.getToId();

        if(toId == null){
            return ;
        }

        //  id取模 决定分区
        kafka.send(topic,toId % partitions, JSON.toJSONString(message));
    }
}
